﻿/**
* CRL
*/
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Linq;
using System.Threading.Tasks;
using CRL.EventBus.Queue;
using CRL.Core;

namespace CRL.EventBus
{

    public class SubscribeService
    {
        static Dictionary<string, EventDeclare> eventRegister = new Dictionary<string, EventDeclare>();
        QueueConfig queueConfig;
        internal static Action<MsgPackage> _OnEventDataRemove;
        internal static SubscribeService _instance;
#if NETSTANDARD 
        IServiceProvider provider;
        public SubscribeService(QueueConfig _queueConfig, IServiceProvider _provider)
        {
            queueConfig = _queueConfig;
            provider = _provider;
            _instance = this;
        }
#endif
        public SubscribeService(QueueConfig _queueConfig)
        {
            queueConfig = _queueConfig;
            _instance = this;
        }
        public static EventDeclare GetEventDeclare(string name)
        {
            eventRegister.TryGetValue(name, out EventDeclare ed);
            return ed;
        }
        internal static void Register(QueueConfig _queueConfig, Type type, bool fromNetCoreInjection = false)
        {
            var atr = type.GetCustomAttribute(typeof(SubscribeAttribute));
            if (atr == null)
            {
                return;
            }
            var methods = type.GetMethods();
            foreach (var m in methods)
            {
                var atr2 = m.GetCustomAttribute(typeof(SubscribeAttribute));
                if (atr2 == null)
                {
                    continue;
                }
                var ed = CreateEventDeclare(_queueConfig, atr2 as SubscribeAttribute, m);
                ed.ServiceInstanceType = type;
                if (!fromNetCoreInjection)
                {
                    ed.ServiceInstanceCtor2 = DynamicMethodHelper.CreateCtorFunc<Func<object>>(type, Type.EmptyTypes);
                }

                if (eventRegister.ContainsKey(ed.Name))
                {
                    return;
                    //throw new Exception($"已注册过相同的事件名 {ed.Name}");
                }

                eventRegister.Add(ed.Name, ed);
            }
        }
        static bool typeRegistered;
        internal static void SetTypeRegistered()
        {
            typeRegistered = true;
        }
        /// <summary>
        /// 按注册程序集，启动订阅
        /// </summary>
        public void StartSubscribe()
        {
            if (!typeRegistered && queueConfig.SubscribeAssemblies != null)
            {
                foreach (var assembyle in queueConfig.SubscribeAssemblies)
                {
                    var types = assembyle.GetTypes();
                    foreach (var type in types)
                    {
                        Register(queueConfig, type);
                    }
                }
                SetTypeRegistered();
            }
            //var queueConfig = QueueConfig.GetConfig();
            foreach (var ed in eventRegister.Values)
            {
                var queue = QueueFactory.GetSubQueueClient(queueConfig, ed);
                //ed.IQueue = queue;
#if NETSTANDARD 
                if (provider != null && queueConfig.fromNetCoreInjection)
                {
                    ed.ServiceInstanceCtor2 = () =>
                    {
                        if (ed.FromManual)
                        {
                            return null;
                        }
                        var instance = provider.GetService(ed.ServiceInstanceType);
                        return instance;
                    };//core 传入服务实例化委托
                }
#endif
            }

            foreach (var ed in eventRegister.Values)
            {
                var queue = QueueFactory.GetSubQueueClient(queueConfig, ed);
                //Console.WriteLine($"SubscribeService {ed.MQType} {ed.Name} 开始消费");
                if (ed.IsAsync)
                {
                    queue.SubscribeAsync(ed);
                }
                else
                {
                    queue.Subscribe(ed);
                }
                log($"启动订阅 {queue.GetType().Name} IsAsync:{ed.IsAsync} Name:{ed.Name}({ed.SubscribeAttribute.Remark}) DataType:{ed.EventDataType.Name} QueueName:{ed.SubscribeAttribute.QueueName}");
                System.Threading.Thread.Sleep(10);
                //Console.WriteLine($"{ed} 开始消费");
            }
            QueueFactory.ConfirmConsume();
        }
        #region 手动指定订阅
        static void log(string msg)
        {
            Console.WriteLine(msg);
            EventLog.Log(msg, "EventBus");
        }
        /// <summary>
        /// 手动订阅
        /// </summary>
        /// <typeparam name="TArgs"></typeparam>
        /// <param name="attr"></param>
        /// <param name="func"></param>
        public void AddSubscribe<TArgs>(SubscribeAttribute attr, Func<TArgs, object> func)
        {
            AddSubscribe(attr, func, null, false);
        }
        /// <summary>
        /// 异步订阅
        /// </summary>
        /// <typeparam name="TArgs"></typeparam>
        /// <param name="attr"></param>
        /// <param name="func"></param>
        public void AddSubscribeAsync<TArgs>(SubscribeAttribute attr, Func<TArgs, Task<object>> func)
        {
            AddSubscribe(attr, func, null, true);
        }
        internal void AddSubscribe<TArgs>(SubscribeAttribute attr, Func<TArgs, object> func, AbsQueue queue, bool isAsync)
        {
            var ed = CreateEventDeclare<TArgs>(attr, (i, b) =>
            {
                var v = b.First();
                if (v is TArgs)
                {
                    return func((TArgs)v);
                }
                throw new Exception($"发布和订阅类型冲突 subName:{attr.Name} {typeof(TArgs)}=>{v.GetType()}");
            });
            ed.IsAsync = isAsync;
            ed.FromManual = true;
            eventRegister.Add(ed.Name, ed);
            if (queue != null)
            {
                //var queue = QueueFactory.GetSubQueueClient(queueConfig, ed);
                //Console.WriteLine($"SubscribeService {ed.MQType} {ed.Name} 开始消费");
                if (ed.IsAsync)
                {
                    queue.SubscribeAsync(ed);
                }
                else
                {
                    queue.Subscribe(ed);
                }
            }
        }
        EventDeclare CreateEventDeclare<TArgs>(SubscribeAttribute attr, Func<object, object[], object> func)
        {
            var key = attr?.Name;
            var eventDataType = typeof(TArgs);
            if (string.IsNullOrEmpty(key))
            {
                throw new Exception("name不能为空");
            }
            key = queueConfig.getEventName(key);
            var isArry = typeof(System.Collections.IEnumerable).IsAssignableFrom(eventDataType) && eventDataType != typeof(string);

            var ed = new EventDeclare()
            {
                EventDataType = eventDataType,
                Name = key,
                MethodInvoke = func,
                //IsAsync = isAsync,
                IsArray = isArry,
                SubscribeAttribute = attr,
            };
            return ed;
        }
        #endregion
        static EventDeclare CreateEventDeclare(QueueConfig _queueConfig, SubscribeAttribute attr, MethodInfo method)
        {
            var key = attr?.Name;
            if (string.IsNullOrEmpty(key))
            {
                throw new Exception("name不能为空");
            }
            key = _queueConfig.getEventName(key);
            var func = Core.DynamicMethodHelper.CreateMethodInvoker(method);
            var args1 = method.GetParameters().FirstOrDefault();
            if (args1 == null)
            {
                throw new Exception("至少一个参数");
            }
            var eventDataType = args1.ParameterType;
            var isArry = typeof(System.Collections.IEnumerable).IsAssignableFrom(eventDataType) && eventDataType != typeof(string);
            var isAsync = method.ReturnType == typeof(Task);

            var ed = new EventDeclare()
            {
                EventDataType = eventDataType,
                Name = key,
                MethodInvoke = func,
                IsAsync = isAsync,
                IsArray = isArry,
                SubscribeAttribute = attr,
            };
            return ed;
        }
        public static void OnEventDataRemove(MsgPackage data)
        {
            _OnEventDataRemove?.Invoke(data);
        }
        public void StopSubscribe()
        {
            QueueFactory.DisposeAll();
        }
    }
}
